-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Request marshaling error should not corrupt a channel #991
Conversation
@lukebakken, I haven't forgotten - will catch up in the next couple of days. |
@vitaly-krugl no hurry at all! Thanks again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something doesn't seem quite right to me with the fix. It might be treating one symptom with a work-around, but not the entire problem. I would expect RabbitMQ to react to the bad queue name in ch.queue_declare(queue=[1, 2, 3])
by sending Channel.Close to the client, in which case the channel should have been removed from Connection
and BlockingConnection
, and the final connection.close()
call should not be sending Channel.Close on that channel at all during connection closing.
Does ch.queue_declare(queue=[1, 2, 3])
actually get sent to the broker, and does it elicit a Channel.Close response from the broker in this case?
@vitaly-krugl the |
I'll try that out. |
Don't try it out just yet, I missed something there (and the fix in the PR did, too, I think) |
And code comments should provide some insight into each of these, so that a future contribution won't refactor or reorder things in a way that's incompatible with the solution. |
@vitaly-krugl - I have merged in the tests you provided and this is ready for another review. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lukebakken, please see my feedback.
pika/channel.py
Outdated
@@ -1327,9 +1327,10 @@ def _on_synchronous_complete(self, _method_frame_unused): | |||
while self._blocked and self._blocking is None: | |||
self._rpc(*self._blocked.popleft()) | |||
|
|||
def _drain_blocked_methods_on_remote_close(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change in this method (including its name) is not necessary for fixing the issue and breaks normal functionality, please revert the change in this method entirely including the name. Here is why: the user was and should be able to make a blocking legitimate request or a series of them followed by Channel.Close and not have those legitimate requests preempted. Preempting them would be overly-opinionated :). If we think we need an "emergency channel-close" method that purges queued-up requests (I don't think we do), then we should either have a different appropriately-named method or an optional arg that makes that explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we think we need an "emergency channel-close" method that purges queued-up requests (I don't think we do
Draining blocked methods on a broker-initiated close was introduced in #957 - please check that PR out again. I still think it's necessary.
pika/channel.py
Outdated
@@ -1397,6 +1398,13 @@ def _rpc(self, method, callback=None, acceptable_replies=None): | |||
if self.is_closed: | |||
self._raise_if_not_open() | |||
|
|||
if self.is_closing and self._blocking: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want/need this if self.is_closing and self._blocking
code block. It prevents the user from being able to queue up several blocking requests followed by channel.close()
and having those requests complete normally.
pika/channel.py
Outdated
# thrown | ||
try: | ||
self._send_method(method) | ||
except Exception as err: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the except Exception as err:
block adds any value. The user should be able to glean the same information from the traceback produced by the failed self._send_method()
.
pika/channel.py
Outdated
except Exception as err: | ||
if self._blocking: | ||
LOGGER.error( | ||
"send_method failed for blocking method %s with %s, will discard", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably doesn't matter given my preceding comment, but if self._blocking
was True, we wouldn't have made it this far.
pika/channel.py
Outdated
@@ -1405,6 +1413,19 @@ def _rpc(self, method, callback=None, acceptable_replies=None): | |||
self._blocked.append([method, callback, acceptable_replies]) | |||
return | |||
|
|||
# Note: _send_method can throw exceptions if there are framing errors | |||
# or invalid data passed in. Call it here, before the acceptable_replies | |||
# block to prevent callbacks from being registered if an exception is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and even more importantly prevent self._blocking
from being erroneously set too (which was causing the deadlock).
pika/connection.py
Outdated
frame_header = frame.Header(channel_number, length, content[0]) | ||
frame_method_marshaled = frame_method.marshal() | ||
frame_header_marshaled = frame_header.marshal() | ||
self._output_marshaled_frame(frame_method_marshaled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sequence of calls (as well as the one below) still permits a partial request to be placed on the output buffer, thus corrupting the output stream. What needs to happen is all the marshaled frames need to be appended to a list. Only when the entire message has been successfully marshaled, we go ahead and place the marshaled frames on the output buffer.
Also, I find that the single-use intermediate variables as used here detract from code readability in Python. They have their place when the value being computed would make an all-in-one statement difficult to read, but I don't think it's the case here. Same in _send_frame()
. Here is an example of what I have in mind:
marshaled_frames = []
marshaled_frames.append(
frame.Method(channel_number, method_frame).marshal())
marshaled_frames.append(
frame.Header(channel_number, length, content[0]).marshal())
# And below...
if content[1]:
# ...
for chunk in xrange(0, chunks):
# ...
marshaled_frames.append(
frame.Body(channel_number, content[1][start:end]).marshal())
self._output_marshaled_frames(marshaled_frames)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, as you can see from my code snippet above, I now think it's better to have a _output_marshaled_frames()
(note plural) method instead of _output_marshaled_frame()
in order to avoid _detect_backpressure()
calls on per-frame basis. _send_frame()
can just wrap its single frame in a list
when calling _output_marshaled_frames()
.
@@ -2266,8 +2261,15 @@ def _send_message(self, channel_number, method_frame, content): | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incidentally, the method_frame
parameter to _send_message()
is misnamed in this legacy code. What's passed in should be called method
. The method_frame is created inside the function when it constructs frame.Method
.
pika/channel.py
Outdated
# Note: This is normally called when a synchronous command is | ||
# completed. It will undo the blocking state and send all the | ||
# frames that stacked up while we were in the blocking state. | ||
self._on_synchronous_complete(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukebakken, we really don't want this. If there is a blocking command, then Chanel.Close will get appended to self._blocked
just like any other blocked command. When the command(s) that blocked complete, _on_synchronous_complete
that is separately registered as callback in _rpc()
will allow the Channel.Close to do its work. This self._on_synchronous_complete(None)
call here breaks the synchronous RPC workflow by allowing a new RPC method to be sent before the previous one completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem we're trying to fix is "Request marshaling error should not corrupt a channel". The only reason the channel becomes corrupted is the marshaling issue, which the rest of this PR addresses.
pika/connection.py
Outdated
@@ -2266,8 +2261,14 @@ def _send_message(self, channel_number, method_frame, content): | |||
|
|||
""" | |||
length = len(content[1]) | |||
self._send_frame(frame.Method(channel_number, method_frame)) | |||
self._send_frame(frame.Header(channel_number, length, content[0])) | |||
marshaled_body_frames = collections.deque() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A specialized deque class isn't needed here. A simple list
will do. The marshaled_body_frames.popleft()
logic in the loop at the bottom adds no value and is less efficient than a simple loop iteration. Keep in mind that when the list goes out of scope, python will automatically deref the list and all its items inside C code versus popping here one at a time.
pika/connection.py
Outdated
marshaled_body_frames.append(frame_body.marshal()) | ||
|
||
while marshaled_body_frames: | ||
self._output_marshaled_frame(marshaled_body_frames.popleft()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per my previous comment, the popping is unnecessary and is adds inefficiencies. The following is preferred:
for chunk in marshaled_body_frames:
self._output_marshaled_frame(chunk)
However, it's better to do this instead in order to avoid unnecessary per-chunk overhead inside self._output_marshaled_frame()
, namely self._flush_outbound()
and self._detect_backpressure()
:
self._output_marshaled_frames(marshaled_body_frames)
_output_marshaled_frames()
should append all the chunks to self.outbound_buffer
while updating self.bytes_sent
and self.frames_sent
(again no need to pop, see the for chunk in marshaled_body_frames:
example above), and then perform self._flush_outbound()
and if self.params.backpressure_detection: self._detect_backpressure()
just once outside the loop at the end of the _output_marshaled_frames()
method.
@lukebakken, I renamed this PR "Request marshaling error should not corrupt a channel", which reflects issues #990 and #912 more accurately. |
On broker's Channel.Close, the draining is necessary because ANQP says to
ignore all incoming requests after channel is closed except
Channel.Close. So, draining in that case helps break the gridlock.
However, in the case the client is closing the channel with some blocking
requests still pending normally, we have a perfectly healthy channel and
nothing special is needed. The normal course of events will see it through.
…On Tue, Apr 10, 2018, 5:00 AM Luke Bakken ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In pika/channel.py
<#991 (comment)>:
> @@ -1327,9 +1327,10 @@ def _on_synchronous_complete(self, _method_frame_unused):
while self._blocked and self._blocking is None:
self._rpc(*self._blocked.popleft())
- def _drain_blocked_methods_on_remote_close(self):
If we think we need an "emergency channel-close" method that purges
queued-up requests (I don't think we do
Draining blocked methods on a broker-initiated close was introduced in
#957 <#957> - please check that PR out
again. I still think it's necessary.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#991 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/ABX9KigMj1hv6PIavaR70oqFZk9LqB0iks5tnJ7igaJpZM4Sg71J>
.
|
@lukebakken, is this PR ready for re-review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a regression test in blocking connection acceptance tests from issue #990, making sure that this raises the expected exception:
with pika.BlockingConnection() as connection:
channel = connection.channel()
channel.queue_declare(queue=[1, 2, 3])
@vitaly-krugl if I re-select your name in the "Reviewers" dropdown, the status icon changes back to an orange disk ... do you not get a new email saying I re-requested a review? I assumed that you did. If you don't get an email, I can @-mention you in a comment. Thanks for the re-re-reviews 😄 |
I think I might not have ended that review ??
…On Mon, Apr 16, 2018, 6:22 AM Luke Bakken ***@***.***> wrote:
@vitaly-krugl <https://github.com/vitaly-krugl> if I re-select your name
in the "Reviewers" dropdown, the status icon changes back to an orange disk
... do you not get a new email saying I re-requested a review? I assumed
that you did. If you don't get an email, I can @-mention you in a comment.
Thanks for the re-re-reviews 😄
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#991 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/ABX9KmC9Xn-wJn2vurcgpPYZe9kJuVEtks5tpJsrgaJpZM4Sg71J>
.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukebakken, could you please add a regression test in blocking connection acceptance tests from issue #990, making sure that this raises the expected exception from the bad channel.queue_declare()
?
with pika.BlockingConnection() as connection:
channel = connection.channel()
channel.queue_declare(queue=[1, 2, 3])
@vitaly-krugl thanks! Sorry I missed the previous comment about that test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my feedback
@@ -271,6 +270,16 @@ def test(self): | |||
self.assertFalse(ch._impl._consumers) | |||
|
|||
|
|||
class TestUsingInvalidQueueArgument(BlockingTestCaseBase): | |||
def test(self): | |||
"""BlockingConnection raises expected connection when invalid queue parameter is used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"expected connection" ==> "expected exception"
""" | ||
connection = self._connect() | ||
ch = connection.channel() | ||
with self.assertRaises(AssertionError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprising that the underlying implementation raises AssertionError instead of TypeError/ValueError. But no need to fix that.
pika/connection.py
Outdated
self._send_frame(frame.Header(channel_number, length, content[0])) | ||
marshaled_body_frames = [] | ||
|
||
# Note: we construct the Method and Header objects, marshal them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... Method, Header, and Content objects, ...
…method_error() and connection_tests.test_no_side_effects_from_message_marshal_error().
…ds differently on client-requested close Add test for passing an invalid parameter as the queue name
@vitaly-krugl I made your latest suggested changes and rebased against |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - thank you!
Thanks @vitaly-krugl |
Fixes #990 and #912